Source code for hysop.backend.host.python.operator.spatial_filtering

# Copyright (c) HySoP 2011-2024
#
# This file is part of HySoP software.
# See "https://particle_methods.gricad-pages.univ-grenoble-alpes.fr/hysop-doc/"
# for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import numpy as np
from hysop.tools.htypes import check_instance, first_not_None
from hysop.tools.decorators import debug
from hysop.backend.host.host_operator import HostOperator
from hysop.core.graph.graph import op_apply
from hysop.fields.continuous_field import Field
from hysop.parameters.parameter import Parameter
from hysop.topology.cartesian_descriptor import CartesianTopologyDescriptors
from hysop.operator.base.spatial_filtering import (
    PolynomialInterpolationFilterBase,
    RemeshRestrictionFilterBase,
    SpectralRestrictionFilterBase,
    SubgridRestrictionFilterBase,
    PolynomialRestrictionFilterBase,
)


[docs] class PythonPolynomialInterpolationFilter( PolynomialInterpolationFilterBase, HostOperator ):
[docs] def discretize(self, **kwds): if self.discretized: return super().discretize(**kwds) self.Wr = self.subgrid_interpolator.Wr.astype(self.dtype)
@op_apply def apply(self, **kwds): """Apply analytic formula.""" super().apply(**kwds) fin = self.fin fout = self.fout periodicity = self.dFin.periodicity gr, n = self.subgrid_interpolator.gr, self.subgrid_interpolator.n Wr = self.Wr for idx in np.ndindex(*self.iter_shape): oslc = tuple( slice(j * gr[i], (j + 1) * gr[i], 1) for i, j in enumerate(idx) ) islc = tuple( slice(periodicity[i] + j, periodicity[i] + j + n[i], 1) for i, j in enumerate(idx) ) fout[oslc] = Wr.dot(fin[islc].ravel()).reshape(gr) self.dFout.exchange_ghosts()
[docs] class PythonPolynomialRestrictionFilter(PolynomialRestrictionFilterBase, HostOperator):
[docs] def discretize(self, **kwds): if self.discretized: return super().discretize(**kwds) SR = self.subgrid_restrictor self.Rr = SR.Rr.astype(self.dtype) / SR.GR assert self.Rr.shape == tuple(2 * gi + 1 for gi in SR.ghosts), self.Rr.shape
@op_apply def apply(self, **kwds): """Apply analytic formula.""" super().apply(**kwds) fin = self.fin fout = self.fout gr = self.subgrid_restrictor.gr Rr = self.Rr rshape = Rr.shape for idx in np.ndindex(*self.iter_shape): islc = tuple( slice(j * gr[i], j * gr[i] + rshape[i], 1) for i, j in enumerate(idx) ) fout[idx] = (Rr * fin[islc]).sum() self.dFout.exchange_ghosts()
[docs] class PythonRemeshRestrictionFilter(RemeshRestrictionFilterBase, HostOperator): """ Python implementation for lowpass spatial filtering: small grid -> coarse grid using remeshing kernels. """
[docs] def setup(self, **kwds): super().setup(**kwds) fin = self.fin iratio = self.iratio oshape = self.fout.shape dviews = () for idx, Wi in self.nz_weights.items(): slc = tuple( slice(i, i + r * s, r) for (i, s, r) in zip(idx, oshape, iratio) ) dviews += ((Wi, fin[slc]),) self.data_views = dviews
@op_apply def apply(self, **kwds): """Apply analytic formula.""" super().apply(**kwds) fin, fout = self.fin, self.fout fout[...] = 0 for Wi, iview in self.data_views: fout[...] += Wi * iview self.dFout.exchange_ghosts()
[docs] class PythonSpectralRestrictionFilter(SpectralRestrictionFilterBase, HostOperator): """ Python implementation for lowpass spatial filtering: small grid -> coarse grid using the spectral method. """ @op_apply def apply(self, simulation, **kwds): """Apply spectral filter (which is just a square window centered on low frequencies).""" super().apply(**kwds) self.Ft(simulation=simulation) for i, (src_slc, dst_slc) in enumerate(zip(*self.fslices)): self.FOUT[dst_slc] = self.FIN[src_slc] self.FOUT[...] *= self.scaling self.Bt(simulation=simulation) self.dFout.exchange_ghosts() def _compute_scaling_coefficient(self): self.Ft.input_buffer[...] = 1 self.Ft(simulation=False) for i, (src_slc, dst_slc) in enumerate(zip(*self.fslices)): self.FOUT[dst_slc] = self.FIN[src_slc] self.Bt(simulation=False) scaling = 1.0 / self.Bt.output_buffer[(0,) * self.FOUT.ndim] return scaling
[docs] class PythonSubgridRestrictionFilter(SubgridRestrictionFilterBase, HostOperator): """ Python implementation for lowpass spatial filtering: small grid -> coarse grid byt just taking subpoints. """ @op_apply def apply(self, simulation, **kwds): """Apply subgrid filter.""" super().apply(**kwds) self.fout[...] = self.fin[...] self.dFout.exchange_ghosts()